Source code for hysop.core.memory.mempool

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import math

from abc import ABCMeta, abstractmethod
from hysop.constants import __VERBOSE__, __DEBUG__
from hysop.backend import __HAS_OPENCL_BACKEND__, __HAS_CUDA_BACKEND__
from hysop.tools.units import bytes2str, time2str
from hysop.tools.htypes import check_instance
from hysop.tools.contexts import Timer
from hysop.core.memory.mem_utils import memory_repport, virtual_memory
from hysop.core.memory.buffer import Buffer, PooledBuffer
from hysop.core.memory.allocator import AllocatorBase

if __HAS_OPENCL_BACKEND__:
    from pyopencl.tools import bitlog2
elif __HAS_CUDA_BACKEND__:
    from pycuda.tools import bitlog2
else:

    def bitlog2(x):
        assert x > 0
        p = 0
        while x:
            p += 1
            x >>= 1
        return p - 1


[docs] class MemoryPool(metaclass=ABCMeta): """ pyopencl/pycuda like memory pool extended to be compatible for all backends. """ def __new__( cls, name, allocator, max_alloc_bytes=None, mantissa_bits=4, verbose=None, **kwds, ): return super().__new__(cls, **kwds) def __init__( self, name, allocator, max_alloc_bytes=None, mantissa_bits=4, verbose=None, **kwds, ): """ Builds a MemoryPool from an allocator. Provides an allocator like interface. Parameters ---------- name: str the name of this allocator for logging purposes allocator: hysop.core.memory.AllocatorBase allocator used by this memory pool, must be an immediate allocator. verbose: bool turn on or off allocator messages (defaults to hysop verbosity configuration) max_alloc_bytes: int maximum number of bytes this pool will try to allocate before raising a MemoryError. default value is: -80% of physical host memory if allocator is a HostAllocator. -None (no limit) if allocator is a DeviceAllocator mantissa_bits: int subdivisions bits of power of two allocations. higher values means more bins (less memory waste) but less buffer reuse. Notes ----- An allocator that fails to allocate memory should raise a MemoryError to expect the pool to work correctly. Some allocators may fail to raise a MemoryError when there is no more memory left and will just trigger a SIGKILL from operating system or deadlock instead. To avoid such situations, put an artificial software allocation imit trough the max_alloc_bytes parameter. Exceeding this allocation limit will throw a proper MemoryError. Examples: *Host allocation on ubuntu 16.04 => SIGKILL *OpenCl AMD mesa open source driver on ubuntu 16.04 => deadlock *OpenCl Nvidia 375.20 driver on ubuntu 16.04 => work as expected """ super().__init__(**kwds) check_instance(allocator, AllocatorBase) default_limit = ( int(0.80 * virtual_memory().total) if allocator.is_on_host() else None ) max_alloc_bytes = max_alloc_bytes or default_limit verbose = verbose if isinstance(verbose, bool) else __DEBUG__ check_instance(name, str) check_instance(mantissa_bits, int) check_instance(max_alloc_bytes, int, allow_none=True) check_instance(verbose, bool) self.name = name.strip() self.allocator = allocator self.verbose = verbose or True self.bin_nr_to_bin = {} self.alloc_statistics = {} self.mantissa_bits = mantissa_bits self.mantissa_mask = (1 << mantissa_bits) - 1 self.allocated_bytes = 0 self.max_alloc_bytes = max_alloc_bytes if self.allocator.is_deferred: msg = ( "Memory pools expect non-deferred " "semantics from their allocators. You passed a deferred " "allocator, i.e. an allocator whose allocations can turn out to " "be unavailable long after allocation." ) raise RuntimeError(msg) self.active_blocks = 0 self.stop_holding_flag = False @abstractmethod def _wrap_buffer(self, buf, alloc_sz, size, alignment): """ Wrap allocated buffer into a PooledBuffer. """ pass
[docs] def may_alloc(self, size): """ Return true if this pool may allocate a buffer of given size (in bytes). Only logical bound is checked (max_alloc_bytes). """ alloc_sz = self.alloc_size(self.bin_number(size)) return self._may_alloc(alloc_sz)
def _may_alloc(self, alloc_sz): if self.max_alloc_bytes is None: return True else: return self.allocated_bytes + alloc_sz <= self.max_alloc_bytes
[docs] def bin_number(self, size): """ Returns the bin number in witch a buffer of given size would be put. """ l = bitlog2(size) mantissa_bits = self.mantissa_bits if l >= mantissa_bits: shifted = size >> (l - mantissa_bits) else: shifted = size << (mantissa_bits - l) assert not (size and (shifted & (1 << mantissa_bits)) == 0) chopped = shifted & self.mantissa_mask return l << mantissa_bits | chopped
[docs] def alloc_size(self, bin_nr): """ Compute real allocation size given an bin number. Note that alloc_size(bin_number(x)) is always >= x. """ mantissa_bits = self.mantissa_bits exponent = bin_nr >> mantissa_bits mantissa = bin_nr & self.mantissa_mask exp_minus_mbits = exponent - mantissa_bits if exp_minus_mbits >= 0: ones = (1 << exp_minus_mbits) - 1 head = ((1 << mantissa_bits) | mantissa) << exp_minus_mbits else: ones = 0 head = ((1 << mantissa_bits) | mantissa) >> -exp_minus_mbits assert not (ones & head) return head | ones
[docs] def stop_holding(self): """ Tells the pool to stop holding back freed buffer and direclty free all unused buffers. """ self.stop_holding_flag = True self.free_held()
[docs] def free_held(self): """ Free all unused held buffers but does not set the stop_holding_flag flag. """ for _ in self._try_to_free_memory(): pass
@property def held_blocks(self): """ Returns the number of held blocks. """ return sum(len(bin_list) for bin_list in self.bin_nr_to_bin.values())
[docs] def header(self): """ Return the header of this pool for logging purposes. """ return f"[{self.name}]"
[docs] def allocate_aligned(self, size, alignment): """ Same as allocate for a memory pool. """ return self.allocate(nbytes=size, alignment=alignment)
[docs] def allocate(self, nbytes, alignment=None): """ Allocate a buffer of size nbytes and given alignment. The real size of the allocated buffer may be greater and wasted memory mostly depend on configured mantissa_bits (default is 2). The returned buffer is an instance of hysop.core.memory.buffer.PooledBuffer. While a reference to the returned object is kept, it won't return to the pool. """ alignment = alignment or 1 assert alignment > 0 assert not (alignment & alignment - 1), "alignment is not a power of 2." # we may need more memory to align returned ptr min_alloc_size = nbytes + alignment - 1 # maybe bin allocation size will be sufficient bin_nr = self.bin_number(nbytes) alloc_sz = self.alloc_size(bin_nr) # else we choose a bin that can provide min_alloc_size bytes if alloc_sz < min_alloc_size: bin_nr = self.bin_number(min_alloc_size) alloc_sz = self.alloc_size(bin_nr) bin_list = self.bin_nr_to_bin.setdefault(bin_nr, []) assert self.bin_number(alloc_sz) == bin_nr size = nbytes stat_nr = bitlog2(size) statistic = self.alloc_statistics.setdefault( stat_nr, PoolAllocationStatistics() ) verbose = self.verbose if bin_list: if verbose: msg = "{} allocation request of size {} served from bin {}." msg = msg.format(self.header(), bytes2str(size, decimal=False), bin_nr) print(msg) self.active_blocks += 1 statistic.push_reuse(alloc_sz) return self._wrap_buffer(bin_list.pop(), alloc_sz, size, alignment) if self._may_alloc(alloc_sz): try: with Timer() as t: result = self.allocator(alloc_sz) self.active_blocks += 1 self.allocated_bytes += alloc_sz statistic.push_alloc(alloc_sz, t.interval) if verbose: msg = "{} allocated new block of size {} to serve a {} request." msg = msg.format( self.header(), bytes2str(alloc_sz, decimal=False), bytes2str(size, decimal=False), ) print(msg) return self._wrap_buffer(result, alloc_sz, size, alignment) except MemoryError as e: if verbose: msg = "{} allocation of size {} failed, freeing unused blocks." msg = msg.format(self.header(), bytes2str(alloc_sz, decimal=False)) print(msg) else: prefix = " " * len(self.header()) allocated_bytes = self.allocated_bytes max_alloc_bytes = self.max_alloc_bytes available = max_alloc_bytes - allocated_bytes msg = "{} allocating {} would exceed pool max allocation limits:" msg += "\n{p} *current {}" msg += "\n{p} *max {}" msg += "\n{p} *available {}" msg += "\n{p} => trying to free unused blocks before allocation." msg = msg.format( self.header(), bytes2str(alloc_sz, decimal=False), bytes2str(allocated_bytes, decimal=False), bytes2str(max_alloc_bytes, decimal=False), bytes2str(available, decimal=False), p=prefix, ) print(msg) freed_bytes = 0 try_last_alloc = False for fb in self._try_to_free_memory(): may_alloc = self._may_alloc(alloc_sz) if fb is None: # all unused block were freed, last chance to allocate if may_alloc: try_last_alloc = True else: prefix = " " * len(self.header()) allocated_bytes = self.allocated_bytes max_alloc_bytes = self.max_alloc_bytes available = max_alloc_bytes - allocated_bytes msg = "{} all unused blocks freed but allocation would " msg += "still exceed pool limits:" msg += "\n{p} *current {}" msg += "\n{p} *max {}" msg += "\n{p} *available {}" msg = msg.format( self.header(), bytes2str(allocated_bytes, decimal=False), bytes2str(max_alloc_bytes, decimal=False), bytes2str(available, decimal=False), p=prefix, ) print(msg) else: freed_bytes += fb if ((freed_bytes >= alloc_sz) and may_alloc) or try_last_alloc: try: with Timer() as t: result = self.allocator(alloc_sz) self.active_blocks += 1 self.allocated_bytes += alloc_sz statistic.push_alloc(alloc_sz, t.interval) if verbose: msg = "{} allocation succeded after block destruction." msg = msg.format(self.header()) print(msg) return (self, result, alloc_sz, size, alignment) except MemoryError: pass msg = "{} no more free blocks left, allocation failed." msg = msg.format(self.header()) print(msg) print() print(memory_repport()) if statistic.nallocs == 0: self.alloc_statistics.pop(stat_nr) self.print_allocation_report() raise MemoryError(msg)
__call__ = allocate
[docs] def free(self, buf, size): bin_nr = self.bin_number(size) stat_nr = bitlog2(size) statistics = self.alloc_statistics[stat_nr] self.active_blocks -= 1 if not self.stop_holding_flag: self.bin_nr_to_bin.setdefault(bin_nr, []).append(buf) if self.verbose: msg = "{} block of size {} returned to bin {}." # wich now contains {} entries.' msg = msg.format(self.header(), bytes2str(size, decimal=False), bin_nr) # len(self.bin_nr_to_bin[bin_nr])) print(msg) statistics.push_return(size) else: if self.verbose: msg = "{} freeing block of size {} in bin {}." msg = msg.format(self.header(), bytes2str(size), bin_nr) print(msg) with Timer() as t: self.allocator.free(buf) self.allocated_bytes -= size statistics.push_free(size, t.interval)
def _try_to_free_memory(self): for bin_nr, bin_list in self.bin_nr_to_bin.items(): if not bin_list: continue size = bin_list[0].size stat_nr = bitlog2(size) statistics = self.alloc_statistics[stat_nr] while bin_list: block = bin_list.pop() if self.verbose: msg = "{} freeing block of size {}." msg = msg.format(self.header(), bytes2str(size, decimal=False)) print(msg) with Timer() as t: self.allocator.free(block) self.allocated_bytes -= size statistics.push_free(size, t.interval) yield size yield None
[docs] def allocation_report(self): """ Returns various statistics of this pool as a string. """ stats = self.alloc_statistics.values() nrequests = sum(v.nrequests for v in stats) nallocs = sum(v.nallocs for v in stats) nreuses = sum(v.nreuses for v in stats) nreturns = sum(v.nreturns for v in stats) nfrees = sum(v.nfrees for v in stats) ballocs = sum(v.ballocs for v in stats) breuses = sum(v.breuses for v in stats) bfrees = sum(v.bfrees for v in stats) breturns = sum(v.breturns for v in stats) active_blocks = self.active_blocks held_blocks = self.held_blocks allocated_bytes = self.allocated_bytes assert active_blocks + held_blocks == nallocs - nfrees assert allocated_bytes == ballocs - bfrees assert active_blocks == nallocs + nreuses - nreturns assert held_blocks == nreturns - nreuses - nfrees assert nrequests == nallocs + nreuses width = 0 for n in [active_blocks, held_blocks, nrequests, nallocs, nreuses, nfrees]: if n == 0: continue width = max(width, int(math.ceil(math.log10(n)))) ss = f"== Memory pool {self.name} allocation report ==" ss += "\n Global pool statistics:" ss += "\n {:>{width}} blocks active {} ({})".format( active_blocks, bytes2str(ballocs + breuses - breturns, decimal=False), bytes2str(ballocs + breuses - breturns), width=width, ) ss += "\n {:>{width}} blocks held {} ({})".format( held_blocks, bytes2str(breturns - breuses - bfrees, decimal=False), bytes2str(breturns - breuses - bfrees), width=width, ) ss += "\n" ss += "\n {:>{width}} blocks requested {} ({})".format( nrequests, bytes2str(ballocs + breuses, decimal=False), bytes2str(ballocs + breuses), width=width, ) ss += "\n {:>{width}} blocks reused {} ({})".format( nreuses, bytes2str(breuses, decimal=False), bytes2str(breuses), width=width ) ss += "\n {:>{width}} blocks allocated {} ({})".format( nallocs, bytes2str(ballocs, decimal=False), bytes2str(ballocs), width=width ) ss += "\n {:>{width}} blocks freed {} ({})".format( nfrees, bytes2str(bfrees, decimal=False), bytes2str(bfrees), width=width ) ss += "\n" ss += "\n Detailed pool statistics:" has_stats = False for stat_nr, stat in self.alloc_statistics.items(): has_stats = True ss += "\n {:>10} <{} x <= {:<10} => {}".format( bytes2str(2 ** (stat_nr), decimal=False), "=" if stat_nr == 0 else " ", bytes2str(2 ** (stat_nr + 1), decimal=False), stat, ) if not has_stats: ss += "\n *no allocated blocks*" ss += "\n\n Held blocks:" has_block = False for bin_nr in sorted(self.bin_nr_to_bin.keys()): blocks = self.bin_nr_to_bin[bin_nr] nblocks = len(blocks) if nblocks == 0: continue has_block = True mean_bytes = sum(b.size for b in blocks) / float(nblocks) ss += "\n *bin {}: nblocks={} mean_block_size={}".format( bin_nr, nblocks, bytes2str(mean_bytes) ) if not has_block: ss += "\n *no held blocks, all blocks are in use*" ss += "\n==========================================" return ss
[docs] def print_allocation_report(self): """ Print various statistics of this pool. """ print() print(self.allocation_report()) print()
[docs] class PoolAllocationStatistics: def __init__(self): # counters self.nrequests = 0 self.nallocs = 0 self.nfrees = 0 self.nreturns = 0 self.nreuses = 0 # bytes self.ballocs = 0 self.bfrees = 0 self.breuses = 0 self.breturns = 0 # profiling self.tallocs = 0 self.tfrees = 0
[docs] def push_alloc(self, size, t): self.nrequests += 1 self.nallocs += 1 self.ballocs += size self.tallocs += t
[docs] def push_reuse(self, size): assert self.nallocs > 0 self.nrequests += 1 self.nreuses += 1 self.breuses += size
[docs] def push_return(self, size): assert self.nallocs > 0 self.nreturns += 1 self.breturns += size
[docs] def push_free(self, size, t): assert self.nallocs > 0 self.nfrees += 1 self.bfrees += size self.tfrees += t
[docs] def reuse_factor(self): if self.nallocs > 0: return float(self.nreuses) / self.nrequests else: return 0
[docs] def mean_alloc_time(self): if self.nallocs > 0: return self.tallocs / self.nallocs else: return 0
[docs] def mean_free_time(self): if self.nfrees > 0: return self.tfrees / self.nfrees else: return 0
def __str__(self): ss = "{:>4} requests | {:>4} reuse | {} allocs | {} frees".format( self.nrequests, f"{int(self.reuse_factor()*1000)/10.0}%" if self.nreuses else " no", time2str(self.mean_alloc_time(), on_zero=" no"), time2str(self.mean_free_time(), on_zero=" no"), ) return ss